In this chapter, we will explain the usage of StreamInsight Extensions for OPC Data Client on one of the examples that comes with the product. It is the SimpleDAStreamInsightApplication project in C#. This example uses OPC Data Access; other examples, for OPC Alarms and Events, or OPC Unified Architecture, are quite similar.
In order to make it easy to understand the specifics of StreamInsight Extensions for OPC Data Client, and stay away from parts that common to all StreamInsight application, we have created this set of simple examples by deriving from the examples that Microsoft uses to teach StreamInsight. We can thus stay away from explaining the information that is already available by Microsoft.
Our example will expose an embedded server from inside a console application, and also run the query and display the results. It is based on Microsoft’s “StreamInsight Example: Server - Exposing an Embedded Server”, http://msdn.microsoft.com/en-us/library/hh995352(v=sql.111).aspx . It is recommended that you study the Microsoft’s example first.
The first steps are the same:
In the next step, an input source is defined and deployed to the (StreamInsight) server with a name so that it can be used by other StreamInsight clients. In this example, the data is a simple temporal stream of point events generated by the OPC Data Access server.
The DAItemChangedObservable.Create<int> method creates an observable that generates a sequence of EasyDAItemChangedEventArgs<int> objects. Each notification contains, among other information, the value of a given integer OPC item (if available), as it is delivered by the OPC server. Because the EasyDAItemChangedEventArgs<int> objects are not directly suitable as StreamInsight event payloads, we convert them to DAItemChangedPayload<int>, which is pre-made payload object provided by the StreamInsight Extensions for OPC Data Client.
Define and Deploy a Source |
Copy Code
|
---|---|
// DEFINE a simple SOURCE (returns a point event every second) const string machineName = ""; const string serverClass = "OPCLabs.KitServer.2"; const string itemId = "Simulation.Incrementing (1 s)"; var observable = DAItemChangedObservable.Create<int>(machineName, serverClass, itemId, 100); var mySource = myApp .DefineObservable(() => observable) .ToPointStreamable( eventArgs => PointEvent.CreateInsert(DateTimeOffset.Now, (DAItemChangedPayload<int>)eventArgs), AdvanceTimeSettings.StrictlyIncreasingStartTime); |
Next, compose a query over the input source. The query uses LINQ as the query specification language. In this example, the query returns the events where the value of the OPC item is an even number.
Compose a Query over the Source |
Copy Code
|
---|---|
// Compose a QUERY over the source // (return every event carrying even data value) var myQuery = from e in mySource where e.VtqPayload.Value % 2 == 0 select e; |
Technically, this definition translates to a filter operator that drops all events from the sequence that do not fulfill the filter predicate (where e.VtqPayload.Value % 2 == 0) and returns the event value. For more information about LINQ query operators, see Using StreamInsight LINQ.
Next, an output sink is created that can be bound to the query and process the resulting sequence. In this example, a simple function is created that simply writes the stream values to the console. The DAItemChangedPayload<int> class has a convenient ToString() method that returns the relevant payload information nicely formatted for output.
Define and Deploy a Sink |
Copy Code
|
---|---|
// DEFINE a simple observer SINK (writes the value to the server console) var mySink = myApp.DefineObserver(() => Observer.Create<DAItemChangedPayload<int>>( payload => Console.WriteLine("sink_Server..: {0}", payload))); |
The sink is then deployed to the server with a name, in the same way as in the original Microsoft example.
The remainder of the example is not doing anything specific to OPC. The observable query is bound to the observer output sink, and the query is then run in a process in the server. This process continues to run until the user stops it by typing in the console.